Skip to content

perf(storage): reduce multipart upload copying#2863

Open
ValentaTomas wants to merge 7 commits into
mainfrom
valenta/storage-upload-copy-reduction
Open

perf(storage): reduce multipart upload copying#2863
ValentaTomas wants to merge 7 commits into
mainfrom
valenta/storage-upload-copy-reduction

Conversation

@ValentaTomas
Copy link
Copy Markdown
Member

Summary

Remove multipart Content-MD5 hashing and reduce compression/upload allocations by pooling frame input buffers and using a custom multi-slice upload reader.

Test plan

  • go test ./packages/shared/pkg/storage

@cla-bot cla-bot Bot added the cla-signed label May 29, 2026
@cursor
Copy link
Copy Markdown

cursor Bot commented May 29, 2026

PR Summary

Medium Risk
Changes hot-path compression buffering and GCS multipart request bodies; Content-MD5 is still required for transport integrity, so behavior should stay equivalent if MD5 and reader replay stay correct.

Overview
The PR description says multipart Content-MD5 hashing was removed, but uploadPart and uploadPartSlices still set Content-MD5 (now via md5.Sum / a per-request hasher over slices). That wording is wrong for reviewers expecting no MD5 on the wire.

Compression now pools frame read buffers in a shared sync.Pool, hands *[]byte into async addFrame, and returns buffers after each frame compresses; read failures and zero-byte reads return buffers on the read path. GCS slice uploads use a new multiSliceReader instead of io.MultiReader plus per-slice bytes.NewReader, so retries replay slices without building intermediate readers. Tests still assert Content-MD5 against the uploaded body.

Reviewed by Cursor Bugbot for commit 13a9043. Bugbot is set up for automated code reviews on this repo. Configure here.

@codecov
Copy link
Copy Markdown

codecov Bot commented May 29, 2026

❌ 3 Tests Failed:

Tests completed Failed Passed Skipped
2705 3 2702 5
View the full list of 3 ❄️ flaky test(s)
github.com/e2b-dev/infra/tests/integration/internal/tests/api/sandboxes::TestSandboxListPaginationRunningLargerLimit

Flake rate in main: 42.80% (Passed 747 times, Failed 559 times)

Stack Traces | 90.7s run time
=== RUN   TestSandboxListPaginationRunningLargerLimit
    sandbox_list_test.go:327: Created sandbox 1/12: ioq9bn2yg9fyrhymewi9r
    sandbox_list_test.go:327: Created sandbox 2/12: ignv35z2zjcz9uofy4x96
    sandbox_list_test.go:327: Created sandbox 3/12: iutsdofb1si9la2ga8u3f
    sandbox_list_test.go:327: Created sandbox 4/12: isxt5hht0tya0b6y1epk1
    sandbox_list_test.go:327: Created sandbox 5/12: idjv4y4m9613pe2uztnm3
    sandbox_list_test.go:327: Created sandbox 6/12: ize5gamwq9804hddvumg5
    sandbox_list_test.go:327: Created sandbox 7/12: i8wydwa1s9k1f4nj0ok6b
    sandbox_list_test.go:327: Created sandbox 8/12: inaxr79i7m6ndy5xwe4e5
    sandbox_list_test.go:327: Created sandbox 9/12: i17qwvq9dg1zex7aa2zoo
    sandbox_list_test.go:327: Created sandbox 10/12: i61dq31p00v689v8g6su7
    sandbox_list_test.go:327: Created sandbox 11/12: iga0c3co7za2v1fucuetu
    sandbox_list_test.go:327: Created sandbox 12/12: i4nxv40jnbz3njuhi2iih
    sandbox_list_test.go:330: 
        	Error Trace:	.../api/sandboxes/sandbox_list_test.go:340
        	            				.../hostedtoolcache/go/1.26.3.../src/runtime/asm_amd64.s:1771
        	Error:      	"[]" should have 12 item(s), but has 0
    sandbox_list_test.go:330: 
        	Error Trace:	.../api/sandboxes/sandbox_list_test.go:330
        	Error:      	Condition never satisfied
        	Test:       	TestSandboxListPaginationRunningLargerLimit
--- FAIL: TestSandboxListPaginationRunningLargerLimit (90.73s)
github.com/e2b-dev/infra/tests/integration/internal/tests/orchestrator::TestSandboxMemoryIntegrity

Flake rate in main: 57.71% (Passed 740 times, Failed 1010 times)

Stack Traces | 63.3s run time
=== RUN   TestSandboxMemoryIntegrity
=== PAUSE TestSandboxMemoryIntegrity
=== CONT  TestSandboxMemoryIntegrity
    sandbox_memory_integrity_test.go:27: Build completed successfully
--- FAIL: TestSandboxMemoryIntegrity (63.28s)
github.com/e2b-dev/infra/tests/integration/internal/tests/orchestrator::TestSandboxMemoryIntegrity/tmpfs_hash

Flake rate in main: 57.83% (Passed 730 times, Failed 1001 times)

Stack Traces | 238s run time
=== RUN   TestSandboxMemoryIntegrity/tmpfs_hash
=== PAUSE TestSandboxMemoryIntegrity/tmpfs_hash
=== CONT  TestSandboxMemoryIntegrity/tmpfs_hash
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{start:{pid:1259}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stdout:"Total memory: 985 MB\nUsed memory before tmpfs mount: 186 MB\nFree memory before tmpfs mount: 797 MB\nMemory to use in integrity test (60% of free, min 64MB): 478 MB\n"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"478+0 records in\n478+0 records out\n501219328 bytes (501 MB, 478 MiB) copied, 1.90469 s, 263 MB/s\n"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"\tCommand being timed: \""}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"dd if=/dev/urandom of=/mnt/testfile bs=1M count=478\"\n\tUser time (seconds): "}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"0.00\n\tSys"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"tem time ("}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"secon"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"ds): "}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"1.89\n\tPerc"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"ent o"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"f CPU t"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"his j"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"ob go"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"t: 99%"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"\n\tEla"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"psed (w"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"all clo"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"ck) t"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"ime (h"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:":mm:ss"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:" or m:ss"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"): 0:01.90"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"\n\tAve"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"rage "}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"share"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"d text s"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"ize (k"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"byte"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"s): 0"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"\n\tAvera"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"ge un"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"share"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"d data "}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"size "}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"(kbytes"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"): 0\n"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"\tAve"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"rage "}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"stack"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:" size ("}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"kbytes):"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:" 0\n\tA"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"verage"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:" tota"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"l size (k"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"bytes):"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:" 0\n\tMa"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"ximum "}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"resident set siz"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"e (kbyt"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"es): 2680"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"\n\tAver"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"age re"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"sident se"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"t size"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:" (kbyt"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"es): 0"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"\n\tMajo"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"r (requir"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"ing I"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"/O) pa"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"ge fa"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"ults: "}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"2\n\tMinor"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:" (reclai"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"ming a"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:" frame"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:") page fau"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stderr:"lts: 345\n\tVoluntary context switches: 3\n\tInvoluntary context switches: 14\n\tSwaps: 0\n\tFile system inputs: 176\n\tFile system outputs: 0\n\tSocket messages sent: 0\n\tSocket messages received: 0\n\tSignals delivered: 0\n\tPage size (bytes): 4096\n\tExit status: 0\n"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{data:{stdout:"Used memory after tmpfs mount and file fill: 669 MB\n"}}
    sandbox_memory_integrity_test.go:70: Command [bash] output: event:{end:{exited:true  status:"exit status 0"}}
    sandbox_memory_integrity_test.go:70: Command [bash] completed successfully in sandbox iwxlrsx0nz1subtv4uyk8
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
    sandbox_memory_integrity_test.go:80: Command [bash] output: event:{start:{pid:1275}}
Executing command bash in sandbox if9k5coxz1u9w2q85j3no (user: root)
    sandbox_memory_integrity_test.go:80: Command [bash] output: event:{data:{stdout:"ff43923001d5184becc53330e55cf5747c3e33780ec80c6b9ee71919ab552b76\n"}}
    sandbox_memory_integrity_test.go:80: Command [bash] output: event:{end:{exited:true  status:"exit status 0"}}
    sandbox_memory_integrity_test.go:80: Command [bash] completed successfully in sandbox iwxlrsx0nz1subtv4uyk8
Executing command bash in sandbox if9k5coxz1u9w2q85j3no (user: root)
    sandbox_memory_integrity_test.go:80: Command [bash] output: event:{start:{pid:1278}}
    sandbox_memory_integrity_test.go:80: Command [bash] output: event:{data:{stdout:"ff43923001d5184becc53330e55cf5747c3e33780ec80c6b9ee71919ab552b76\n"}}
    sandbox_memory_integrity_test.go:80: Command [bash] output: event:{end:{exited:true  status:"exit status 0"}}
    sandbox_memory_integrity_test.go:80: Command [bash] completed successfully in sandbox iwxlrsx0nz1subtv4uyk8
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
    sandbox_memory_integrity_test.go:80: Command [bash] output: event:{start:{pid:1282}}
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
Executing command bash in sandbox iwxlrsx0nz1subtv4uyk8 (user: root)
    sandbox_memory_integrity_test.go:110: 
        	Error Trace:	.../tests/orchestrator/sandbox_memory_integrity_test.go:81
        	            				.../hostedtoolcache/go/1.26.3.../src/runtime/asm_amd64.s:1771
        	Error:      	Received unexpected error:
        	            	failed to execute command bash in sandbox iwxlrsx0nz1subtv4uyk8: unavailable: HTTP status 502 Bad Gateway
    sandbox_memory_integrity_test.go:110: 
        	Error Trace:	.../tests/orchestrator/sandbox_memory_integrity_test.go:78
        	            				.../tests/orchestrator/sandbox_memory_integrity_test.go:110
        	Error:      	Condition never satisfied
        	Test:       	TestSandboxMemoryIntegrity/tmpfs_hash
--- FAIL: TestSandboxMemoryIntegrity/tmpfs_hash (238.12s)

To view more test analytics, go to the Test Analytics Dashboard
📋 Got 3 mins? Take this short survey to help us improve Test Analytics.

Copy link
Copy Markdown
Contributor

@gemini-code-assist gemini-code-assist Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Code Review

The Read method of multiSliceReader violates the io.Reader contract by returning 0, io.EOF when called with a zero-length slice even if data remains to be read, which can cause premature EOF errors.

Comment thread packages/shared/pkg/storage/gcp_multipart.go
@ValentaTomas ValentaTomas marked this pull request as ready for review May 29, 2026 21:47
levb
levb previously requested changes May 30, 2026
Copy link
Copy Markdown
Contributor

@levb levb left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

See #2869

Move buffer ownership into the compression goroutine: the goroutine that
reads `bufPtr` also returns it to `inputPool` via defer when it exits.
Drops `frame.input`, `part.releaseInputBuffers`, and the explicit
release calls in the read/upload loops.

The previous checkpoint design held every part's input buffers until
p.compress.Wait() returned in the upload loop, so buffers stayed pinned
across the GCS upload latency. The pool kept missing and calling New,
leaving allocations on par with or worse than no pool at all. Now,
per-goroutine release caps the pool's working set at workers+1
regardless of stream length, so allocations and peak heap actually
shrink.

Adds compress_upload_pool_demo_test.go (delete before merge) that
mirrors the pre-PR-2863 and PR #2863 designs verbatim and runs them
side-by-side with the real compressStream from this branch on 256 MiB
and 1 GiB inputs with a simulated 500 ms per-part upload latency.

```
go test -run TestPoolLifecycleDemo -v -timeout=10m -count=1 ./packages/shared/pkg/storage/ 2>&1
=== RUN   TestPoolLifecycleDemo
=== RUN   TestPoolLifecycleDemo/256MiB
    compress_upload_pool_demo_test.go:473: input=256 MiB, frame=2048 KiB, part=50 MiB, workers=4, upload_delay=500ms
    compress_upload_pool_demo_test.go:524: main         (no pool)         : total_alloc=  668.0 MiB  mallocs=  2037  heap_inuse_after=  837.9 MiB
    compress_upload_pool_demo_test.go:524: tomas (PR #2863, checkpoint)  : total_alloc=  689.5 MiB  mallocs=  2281  heap_inuse_after=  953.7 MiB
    compress_upload_pool_demo_test.go:524: this branch (per-goroutine)   : total_alloc=  370.2 MiB  mallocs=  1430  heap_inuse_after=  638.6 MiB
    compress_upload_pool_demo_test.go:527: ---
    compress_upload_pool_demo_test.go:528: tomas vs main:  total_alloc   +21.6 MiB  mallocs  +244  heap_inuse  +115.9 MiB
    compress_upload_pool_demo_test.go:532: branch vs main: total_alloc  -297.8 MiB  mallocs  -607  heap_inuse  -199.2 MiB
    compress_upload_pool_demo_test.go:536: branch vs tomas:total_alloc  -319.4 MiB  mallocs  -851  heap_inuse  -315.1 MiB
=== RUN   TestPoolLifecycleDemo/1024MiB
    compress_upload_pool_demo_test.go:473: input=1024 MiB, frame=2048 KiB, part=50 MiB, workers=4, upload_delay=500ms
    compress_upload_pool_demo_test.go:524: main         (no pool)         : total_alloc= 2341.9 MiB  mallocs=  4794  heap_inuse_after= 2986.0 MiB
    compress_upload_pool_demo_test.go:524: tomas (PR #2863, checkpoint)  : total_alloc= 2417.5 MiB  mallocs=  5895  heap_inuse_after= 3462.5 MiB
    compress_upload_pool_demo_test.go:524: this branch (per-goroutine)   : total_alloc= 1334.3 MiB  mallocs=  4367  heap_inuse_after= 2396.1 MiB
    compress_upload_pool_demo_test.go:527: ---
    compress_upload_pool_demo_test.go:528: tomas vs main:  total_alloc   +75.6 MiB  mallocs +1101  heap_inuse  +476.5 MiB
    compress_upload_pool_demo_test.go:532: branch vs main: total_alloc -1007.7 MiB  mallocs  -427  heap_inuse  -589.9 MiB
    compress_upload_pool_demo_test.go:536: branch vs tomas:total_alloc -1083.3 MiB  mallocs -1528  heap_inuse -1066.4 MiB
--- PASS: TestPoolLifecycleDemo (6.48s)
    --- PASS: TestPoolLifecycleDemo/256MiB (2.20s)
    --- PASS: TestPoolLifecycleDemo/1024MiB (4.29s)
PASS
ok  	github.com/e2b-dev/infra/packages/shared/pkg/storage	6.956s
```
Copy link
Copy Markdown

@cursor cursor Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Cursor Bugbot has reviewed your changes and found 1 potential issue.

Fix All in Cursor

Bugbot Autofix prepared a fix for the issue found in the latest run.

  • ✅ Fixed: Temporary demo test still present
    • Deleted the demo test file as instructed by its header comment to prevent unnecessary CI time and remove duplicate compression implementations.

Create PR

Or push these changes by commenting:

@cursor push b8d7ead894
Preview (b8d7ead894)
diff --git a/packages/shared/pkg/storage/compress_upload_pool_demo_test.go b/packages/shared/pkg/storage/compress_upload_pool_demo_test.go
deleted file mode 100644
--- a/packages/shared/pkg/storage/compress_upload_pool_demo_test.go
+++ /dev/null
@@ -1,542 +1,0 @@
-// Demo test: runs the production compressStream from this branch
-// (per-goroutine buffer release) side-by-side with a verbatim mirror of
-// the PR #2863 design (frame.input + releaseInputBuffers checkpoint).
-//
-// Both sides see the same input, same CompressConfig, same uploader, same
-// real zstd compressor pool. We report runtime memstats deltas so the
-// buffer-lifecycle difference shows up as a concrete byte count.
-//
-// Illustration file — delete before merging the PR.
-package storage
-
-import (
-	"bytes"
-	"context"
-	"crypto/sha256"
-	"errors"
-	"fmt"
-	"io"
-	"math/rand"
-	"runtime"
-	"sync"
-	"sync/atomic"
-	"testing"
-	"time"
-
-	"golang.org/x/sync/errgroup"
-)
-
-// ============================================================================
-// Verbatim mirror of compress_upload.go from PR #2863 (commit ef2d92fa1).
-// Renamed with `old` prefix to coexist with the production code in this branch.
-// Only behavioral changes: none. Only syntactic: identifier renames.
-// ============================================================================
-
-type oldFrame struct {
-	uncompressedSize int
-	compressed       []byte
-	input            *[]byte
-}
-
-type oldPart struct {
-	index          int
-	frames         []*oldFrame
-	compressedSize atomic.Int64
-	compress       *errgroup.Group
-	inputPool      *sync.Pool
-}
-
-func oldNewPart(index int, parentCtx context.Context, workers int, inputPool *sync.Pool) (*oldPart, context.Context) {
-	p := &oldPart{index: index, inputPool: inputPool}
-	var ctx context.Context
-	p.compress, ctx = errgroup.WithContext(parentCtx)
-	p.compress.SetLimit(workers)
-
-	return p, ctx
-}
-
-func (p *oldPart) addFrame(ctx context.Context, uncompressedData []byte, pool *sync.Pool) {
-	frameInPart := &oldFrame{uncompressedSize: len(uncompressedData)}
-	p.frames = append(p.frames, frameInPart)
-
-	p.compress.Go(func() error {
-		if err := ctx.Err(); err != nil {
-			return err
-		}
-		c := pool.Get().(compressor)
-		out, err := c.compress(uncompressedData)
-		pool.Put(c)
-		if err != nil {
-			return err
-		}
-		frameInPart.compressed = out
-		p.compressedSize.Add(int64(len(out)))
-
-		return nil
-	})
-}
-
-func (p *oldPart) releaseInputBuffers() {
-	for _, f := range p.frames {
-		if f.input != nil {
-			p.inputPool.Put(f.input)
-			f.input = nil
-		}
-	}
-}
-
-func oldCompressStream(ctx context.Context, in io.Reader, cfg CompressConfig, uploader partUploader, maxUploadConcurrency int, sink FrameSink) (*FrameTable, [32]byte, error) {
-	ctx, cancel := context.WithCancel(ctx)
-	defer cancel()
-
-	if err := uploader.Start(ctx); err != nil {
-		return nil, [32]byte{}, fmt.Errorf("start upload: %w", err)
-	}
-	defer uploader.Close()
-
-	if maxUploadConcurrency < 1 {
-		maxUploadConcurrency = 1
-	}
-	work, workCtx := errgroup.WithContext(ctx)
-	work.SetLimit(maxUploadConcurrency + 1)
-
-	q := make(chan *oldPart, 2)
-	hasher := sha256.New()
-	work.Go(func() error {
-		defer close(q)
-
-		return oldReadLoop(workCtx, in, cfg, hasher, q)
-	})
-
-	var frameSizes []FrameSize
-	var cOffset int64
-	var loopErr error
-	for p := range q {
-		err := p.compress.Wait()
-		p.releaseInputBuffers()
-		if err != nil {
-			loopErr = fmt.Errorf("compress frames: %w", err)
-			cancel()
-
-			break
-		}
-
-		var compressed [][]byte
-		for _, f := range p.frames {
-			frameSizes = append(frameSizes, FrameSize{U: int32(f.uncompressedSize), C: int32(len(f.compressed))})
-			compressed = append(compressed, f.compressed)
-			if sink != nil {
-				sink(ctx, cOffset, f.compressed)
-			}
-			cOffset += int64(len(f.compressed))
-		}
-
-		pi := p.index
-		work.Go(func() error {
-			return uploader.UploadPart(workCtx, pi, compressed...)
-		})
-	}
-
-	for range q { //nolint:revive // intentional drain
-	}
-	workErr := work.Wait()
-
-	if err := errors.Join(loopErr, workErr); err != nil {
-		return nil, [32]byte{}, err
-	}
-
-	if err := uploader.Complete(ctx); err != nil {
-		return nil, [32]byte{}, fmt.Errorf("complete upload: %w", err)
-	}
-
-	ft := NewFrameTable(cfg.CompressionType(), frameSizes)
-
-	return ft, sum256(hasher), nil
-}
-
-func oldReadLoop(ctx context.Context, in io.Reader, cfg CompressConfig, hasher io.Writer, q chan<- *oldPart) error {
-	compressors, err := newCompressorPool(cfg)
-	if err != nil {
-		return err
-	}
-
-	frameSize := cfg.FrameSize()
-	minPartSize := cfg.MinPartSize()
-	workers := max(cfg.FrameEncodeWorkers, 1)
-	inputPool := &sync.Pool{
-		New: func() any {
-			buf := make([]byte, frameSize)
-
-			return &buf
-		},
-	}
-	p, compressCtx := oldNewPart(1, ctx, workers, inputPool)
-
-	for {
-		if err := ctx.Err(); err != nil {
-			return err
-		}
-
-		bufPtr := inputPool.Get().(*[]byte)
-		buf := (*bufPtr)[:frameSize]
-		n, err := io.ReadFull(in, buf)
-
-		eof := errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF)
-		if err != nil && !eof {
-			inputPool.Put(bufPtr)
-
-			return fmt.Errorf("read frame: %w", err)
-		}
-
-		if n > 0 {
-			hasher.Write(buf[:n])
-			p.addFrame(compressCtx, buf[:n], compressors)
-			p.frames[len(p.frames)-1].input = bufPtr
-		} else {
-			inputPool.Put(bufPtr)
-		}
-
-		if eof {
-			if len(p.frames) > 0 {
-				select {
-				case q <- p:
-				case <-ctx.Done():
-					p.releaseInputBuffers()
-
-					return ctx.Err()
-				}
-			}
-
-			return nil
-		}
-
-		if p.compressedSize.Load() >= minPartSize {
-			select {
-			case q <- p:
-			case <-ctx.Done():
-				p.releaseInputBuffers()
-
-				return ctx.Err()
-			}
-			p, compressCtx = oldNewPart(p.index+1, ctx, workers, inputPool)
-		}
-	}
-}
-
-// ============================================================================
-// MAIN form — pre-PR #2863 design. No input pool at all: each frame allocates
-// a fresh `make([]byte, frameSize)` per iteration and relies on GC.
-// Mirrors compress_upload.go from immediately before commit 5b4ac5378.
-// ============================================================================
-
-type mainFrame struct {
-	uncompressedSize int
-	compressed       []byte
-}
-
-type mainPart struct {
-	index          int
-	frames         []*mainFrame
-	compressedSize atomic.Int64
-	compress       *errgroup.Group
-}
-
-func mainNewPart(index int, parentCtx context.Context, workers int) (*mainPart, context.Context) {
-	p := &mainPart{index: index}
-	var ctx context.Context
-	p.compress, ctx = errgroup.WithContext(parentCtx)
-	p.compress.SetLimit(workers)
-
-	return p, ctx
-}
-
-func (p *mainPart) addFrame(ctx context.Context, uncompressedData []byte, pool *sync.Pool) {
-	frameInPart := &mainFrame{uncompressedSize: len(uncompressedData)}
-	p.frames = append(p.frames, frameInPart)
-	p.compress.Go(func() error {
-		if err := ctx.Err(); err != nil {
-			return err
-		}
-		c := pool.Get().(compressor)
-		out, err := c.compress(uncompressedData)
-		pool.Put(c)
-		if err != nil {
-			return err
-		}
-		frameInPart.compressed = out
-		p.compressedSize.Add(int64(len(out)))
-
-		return nil
-	})
-}
-
-func mainCompressStream(ctx context.Context, in io.Reader, cfg CompressConfig, uploader partUploader, maxUploadConcurrency int, sink FrameSink) (*FrameTable, [32]byte, error) {
-	ctx, cancel := context.WithCancel(ctx)
-	defer cancel()
-	if err := uploader.Start(ctx); err != nil {
-		return nil, [32]byte{}, fmt.Errorf("start upload: %w", err)
-	}
-	defer uploader.Close()
-	if maxUploadConcurrency < 1 {
-		maxUploadConcurrency = 1
-	}
-	work, workCtx := errgroup.WithContext(ctx)
-	work.SetLimit(maxUploadConcurrency + 1)
-	q := make(chan *mainPart, 2)
-	hasher := sha256.New()
-	work.Go(func() error {
-		defer close(q)
-
-		return mainReadLoop(workCtx, in, cfg, hasher, q)
-	})
-	var frameSizes []FrameSize
-	var cOffset int64
-	var loopErr error
-	for p := range q {
-		if err := p.compress.Wait(); err != nil {
-			loopErr = fmt.Errorf("compress frames: %w", err)
-			cancel()
-
-			break
-		}
-		var compressed [][]byte
-		for _, f := range p.frames {
-			frameSizes = append(frameSizes, FrameSize{U: int32(f.uncompressedSize), C: int32(len(f.compressed))})
-			compressed = append(compressed, f.compressed)
-			if sink != nil {
-				sink(ctx, cOffset, f.compressed)
-			}
-			cOffset += int64(len(f.compressed))
-		}
-		pi := p.index
-		work.Go(func() error {
-			return uploader.UploadPart(workCtx, pi, compressed...)
-		})
-	}
-	for range q { //nolint:revive // intentional drain
-	}
-	workErr := work.Wait()
-	if err := errors.Join(loopErr, workErr); err != nil {
-		return nil, [32]byte{}, err
-	}
-	if err := uploader.Complete(ctx); err != nil {
-		return nil, [32]byte{}, fmt.Errorf("complete upload: %w", err)
-	}
-	ft := NewFrameTable(cfg.CompressionType(), frameSizes)
-
-	return ft, sum256(hasher), nil
-}
-
-func mainReadLoop(ctx context.Context, in io.Reader, cfg CompressConfig, hasher io.Writer, q chan<- *mainPart) error {
-	compressors, err := newCompressorPool(cfg)
-	if err != nil {
-		return err
-	}
-	frameSize := cfg.FrameSize()
-	minPartSize := cfg.MinPartSize()
-	workers := max(cfg.FrameEncodeWorkers, 1)
-	p, compressCtx := mainNewPart(1, ctx, workers)
-	for {
-		if err := ctx.Err(); err != nil {
-			return err
-		}
-		buf := make([]byte, frameSize) // fresh allocation every frame, no pool
-		n, err := io.ReadFull(in, buf)
-		eof := errors.Is(err, io.EOF) || errors.Is(err, io.ErrUnexpectedEOF)
-		if err != nil && !eof {
-			return fmt.Errorf("read frame: %w", err)
-		}
-		if n > 0 {
-			hasher.Write(buf[:n])
-			p.addFrame(compressCtx, buf[:n], compressors)
-		}
-		if eof {
-			if len(p.frames) > 0 {
-				select {
-				case q <- p:
-				case <-ctx.Done():
-					return ctx.Err()
-				}
-			}
-
-			return nil
-		}
-		if p.compressedSize.Load() >= minPartSize {
-			select {
-			case q <- p:
-			case <-ctx.Done():
-				return ctx.Err()
-			}
-			p, compressCtx = mainNewPart(p.index+1, ctx, workers)
-		}
-	}
-}
-
-// ============================================================================
-// Shared harness.
-// ============================================================================
-
-// demoBuildInput produces deterministic, mildly-compressible data so zstd
-// runs with realistic timing instead of degenerate fast-path on zeros.
-func demoBuildInput(bytesTotal int) []byte {
-	out := make([]byte, bytesTotal)
-	r := rand.New(rand.NewSource(0xCAFEBABE))
-	const blockSz = 4096
-	for i := 0; i < bytesTotal; i += blockSz {
-		end := min(i+blockSz, bytesTotal)
-		// repeat a small random block several times to give zstd something to find
-		seed := make([]byte, 64)
-		r.Read(seed)
-		for j := i; j < end; j++ {
-			out[j] = seed[(j-i)%len(seed)]
-		}
-	}
-
-	return out
-}
-
-// slowUploader wraps memPartUploader and adds a fixed per-part upload delay
-// to simulate GCS multipart upload latency. A 50 MiB part to GCS typically
-// takes 300-800 ms in-region; we use 500 ms as a representative figure.
-type slowUploader struct {
-	inner     memPartUploader
-	partDelay time.Duration
-}
-
-func (s *slowUploader) Start(ctx context.Context) error { return s.inner.Start(ctx) }
-func (s *slowUploader) UploadPart(ctx context.Context, partIndex int, data ...[]byte) error {
-	select {
-	case <-time.After(s.partDelay):
-	case <-ctx.Done():
-		return ctx.Err()
-	}
-
-	return s.inner.UploadPart(ctx, partIndex, data...)
-}
-func (s *slowUploader) Complete(ctx context.Context) error { return s.inner.Complete(ctx) }
-func (s *slowUploader) Close() error                       { return s.inner.Close() }
-func (s *slowUploader) Assemble() []byte                   { return s.inner.Assemble() }
-
-func demoCfg() CompressConfig {
-	return CompressConfig{
-		Enabled:            true,
-		Type:               "zstd",
-		Level:              1, // fastest
-		FrameSizeKB:        2048,
-		MinPartSizeMB:      50,
-		FrameEncodeWorkers: 4,
-		EncoderConcurrency: 0,
-	}
-}
-
-type demoStats struct {
-	totalAllocBytes uint64
-	mallocs         uint64
-	heapInUseAfter  uint64
-}
-
-func demoMeasure(b func()) demoStats {
-	// Explicit GCs isolate this variant's allocation count from previous
-	// variants' residue. Two consecutive GCs let the runtime clear any
-	// pending finalizer-held memory.
-	runtime.GC() //nolint:revive // intentional for measurement isolation
-	runtime.GC() //nolint:revive // intentional for measurement isolation
-	var before, after runtime.MemStats
-	runtime.ReadMemStats(&before)
-	b()
-	runtime.ReadMemStats(&after)
-
-	return demoStats{
-		totalAllocBytes: after.TotalAlloc - before.TotalAlloc,
-		mallocs:         after.Mallocs - before.Mallocs,
-		heapInUseAfter:  after.HeapInuse,
-	}
-}
-
-// TestPoolLifecycleDemo runs three designs side-by-side and reports the memory
-// cost. Single run, real zstd compression, simulated GCS upload latency.
-//
-//   - main:        pre-PR-2863, no pool (fresh `make([]byte, frameSize)` per frame).
-//   - tomas:       PR #2863, pool + frame.input + releaseInputBuffers checkpoint.
-//   - this branch: pool + per-goroutine defer release.
-//
-// Cannot run in parallel: variants share process-wide runtime.MemStats.
-//
-//nolint:paralleltest // measurement requires serial execution
-func TestPoolLifecycleDemo(t *testing.T) {
-	cfg := demoCfg()
-	const partUploadDelay = 500 * time.Millisecond // ~realistic GCS multipart part latency
-
-	for _, sz := range []int{256 << 20, 1 << 30} {
-		//nolint:paralleltest // measurement requires serial execution
-		t.Run(fmt.Sprintf("%dMiB", sz>>20), func(t *testing.T) {
-			t.Logf("input=%d MiB, frame=%d KiB, part=%d MiB, workers=%d, upload_delay=%v",
-				sz>>20, cfg.FrameSizeKB, cfg.MinPartSizeMB, cfg.FrameEncodeWorkers, partUploadDelay)
-
-			input := demoBuildInput(sz)
-
-			type variantResult struct {
-				name  string
-				stats demoStats
-				ft    *FrameTable
-				hash  [32]byte
-				dst   []byte
-			}
-
-			runVariant := func(name string, fn func(io.Reader, partUploader) (*FrameTable, [32]byte, error)) variantResult {
-				u := &slowUploader{partDelay: partUploadDelay}
-				var ft *FrameTable
-				var hash [32]byte
-				st := demoMeasure(func() {
-					var err error
-					ft, hash, err = fn(bytes.NewReader(input), u)
-					if err != nil {
-						t.Fatalf("%s: %v", name, err)
-					}
-				})
-
-				return variantResult{name: name, stats: st, ft: ft, hash: hash, dst: u.Assemble()}
-			}
-
-			mainR := runVariant("main         (no pool)         ", func(r io.Reader, u partUploader) (*FrameTable, [32]byte, error) {
-				return mainCompressStream(t.Context(), r, cfg, u, 4, nil)
-			})
-			tomasR := runVariant("tomas (PR #2863, checkpoint)  ", func(r io.Reader, u partUploader) (*FrameTable, [32]byte, error) {
-				return oldCompressStream(t.Context(), r, cfg, u, 4, nil)
-			})
-			branchR := runVariant("this branch (per-goroutine)   ", func(r io.Reader, u partUploader) (*FrameTable, [32]byte, error) {
-				return compressStream(t.Context(), r, cfg, u, 4, nil)
-			})
-
-			if mainR.hash != tomasR.hash || tomasR.hash != branchR.hash {
-				t.Errorf("hash mismatch: main=%x tomas=%x branch=%x", mainR.hash, tomasR.hash, branchR.hash)
-			}
-			if mainR.ft.NumFrames() != tomasR.ft.NumFrames() || tomasR.ft.NumFrames() != branchR.ft.NumFrames() {
-				t.Errorf("frame count mismatch: main=%d tomas=%d branch=%d",
-					mainR.ft.NumFrames(), tomasR.ft.NumFrames(), branchR.ft.NumFrames())
-			}
-			if !bytes.Equal(mainR.dst, tomasR.dst) || !bytes.Equal(tomasR.dst, branchR.dst) {
-				t.Errorf("uploaded payload mismatch across variants")
-			}
-
-			mib := func(b uint64) float64 { return float64(b) / (1 << 20) }
-			for _, v := range []variantResult{mainR, tomasR, branchR} {
-				t.Logf("%s: total_alloc=%7.1f MiB  mallocs=%6d  heap_inuse_after=%7.1f MiB",
-					v.name, mib(v.stats.totalAllocBytes), v.stats.mallocs, mib(v.stats.heapInUseAfter))
-			}
-			t.Logf("---")
-			t.Logf("tomas vs main:  total_alloc %+7.1f MiB  mallocs %+5d  heap_inuse %+7.1f MiB",
-				mib(tomasR.stats.totalAllocBytes)-mib(mainR.stats.totalAllocBytes),
-				int64(tomasR.stats.mallocs)-int64(mainR.stats.mallocs),
-				mib(tomasR.stats.heapInUseAfter)-mib(mainR.stats.heapInUseAfter))
-			t.Logf("branch vs main: total_alloc %+7.1f MiB  mallocs %+5d  heap_inuse %+7.1f MiB",
-				mib(branchR.stats.totalAllocBytes)-mib(mainR.stats.totalAllocBytes),
-				int64(branchR.stats.mallocs)-int64(mainR.stats.mallocs),
-				mib(branchR.stats.heapInUseAfter)-mib(mainR.stats.heapInUseAfter))
-			t.Logf("branch vs tomas:total_alloc %+7.1f MiB  mallocs %+5d  heap_inuse %+7.1f MiB",
-				mib(branchR.stats.totalAllocBytes)-mib(tomasR.stats.totalAllocBytes),
-				int64(branchR.stats.mallocs)-int64(tomasR.stats.mallocs),
-				mib(branchR.stats.heapInUseAfter)-mib(tomasR.stats.heapInUseAfter))
-		})
-	}
-}
\ No newline at end of file

You can send follow-ups to the cloud agent here.

Reviewed by Cursor Bugbot for commit 616cf63. Configure here.

Comment thread packages/shared/pkg/storage/compress_upload_pool_demo_test.go Outdated
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: 616cf63082

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread packages/shared/pkg/storage/compress_upload_pool_demo_test.go Outdated
@ValentaTomas ValentaTomas requested a review from levb May 31, 2026 02:36
Copy link
Copy Markdown

@chatgpt-codex-connector chatgpt-codex-connector Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

💡 Codex Review

Here are some automated review suggestions for this pull request.

Reviewed commit: f2cf61349c

ℹ️ About Codex in GitHub

Your team has set up Codex to review pull requests in this repo. Reviews are triggered when you

  • Open a pull request for review
  • Mark a draft as ready
  • Comment "@codex review".

If Codex has suggestions, it will comment; otherwise it will react with 👍.

Codex can also answer questions or update the PR. Try commenting "@codex address that feedback".

Comment thread packages/shared/pkg/storage/gcp_multipart.go
Restore Content-MD5 on GCS multipart uploads while keeping the pooled multi-slice reader path.
@ValentaTomas ValentaTomas dismissed levb’s stale review May 31, 2026 04:56

Rerequested review.

uncompressedData := (*bufPtr)[:n]

p.compress.Go(func() error {
defer inputBufPool.Put(bufPtr)
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lets not access the inputBufPool from here at all - keep the responsibility to the place where gets are happening. Keeping get - put together

If we need to do async callback, lets do that instead, or move the goroutine schedule to the readLoop function (if possible)

This comment was marked as outdated.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure if wrapping in callback make sense. This is fundamentally just a free for the memory, if we don't call it and because we need it to be async putting the callback there does not feel like it clears up the lifecycle. It is the same method, but not it is also closured.

Copy link
Copy Markdown
Contributor

@dobrac dobrac May 31, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The goal is to keep the Get/Put combination next to each other - in one place. That way it's possible to validate the buffer free when it's stopped being used.

Now its across two functions which makes it difficult to track

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants